package com.wsjj.yjh.watermark;

import com.wsjj.yjh.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class WatermarkMonoDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(4);
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777);
        SingleOutputStreamOperator<WaterSensor> map = source.map(value -> {
            String[] split = value.split(",");
            return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
        });
        SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = map.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
            @Override
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                System.out.println(element.toString() + "<----->" + recordTimestamp);
                return element.getTs() * 1000L;
            }
        }));

        KeyedStream<WaterSensor, String> keyBy = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId);
        WindowedStream<WaterSensor, String, TimeWindow> window = keyBy.window(TumblingEventTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> aggregate = window.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {
            @Override
            public Integer createAccumulator() {
                return 0;
            }

            @Override
            public Integer add(WaterSensor value, Integer accumulator) {
                return value.getVc()+accumulator;
            }

            @Override
            public String getResult(Integer accumulator) {
                return accumulator.toString();
            }

            @Override
            public Integer merge(Integer a, Integer b) {
                return null;
            }
        }, new WindowFunction<String, String, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {
                String start = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss.SSS");
                String end = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss.SSS");
                long num = input.spliterator().estimateSize();
                out.collect("窗口：["+start+","+end+")"+",个数："+num+",数据："+input.toString());

            }
        });
        aggregate.print();
        env.execute();
    }
}
